package org.example.dobs.demo.flink.wc.source.custom;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
 * 并行度为1
 * 注意：指定数据类型
 * 功能：每秒产生一条数据
 */
public class MySourceFunction_V1 implements SourceFunction<Long> {
    private boolean isRunning = true;
    private long number = 1L;

    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(number++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }


    public static void main(String[] args) throws Exception {
        //step1 environment of flink
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 input
        DataStreamSource<Long> numberStream = see.addSource(new MySourceFunction_V1());
        //step3 transform
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {

            @Override
            public Long map(Long aLong) throws Exception {
                System.out.println("输出元素： " + aLong);
                return aLong;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong % 2 == 0;
            }
        });
        // step4 output
        filterDataStream.print().setParallelism(1);
        // finally run or execute flink job
        see.execute("StreamingDemoWithCustomSource");
    }
}
